Setup Methods¶

These cells must be run prior to any of the other sections below. They load the runtime with the function calls needed for the later cells.

In [1]:
import plotly.express as px
import pandas as pd

def read_data():
    stk_data_parsed = []
    stk_headers = ["id1", "id2", "id3", "country", 
                                "sat_type", "id4", "num1", "num2", "num3", "num4", 
                                "sat_status", "fetched_date"]
    with open("data/stk.txt") as stk_file_text:
        # Append the headers to the parsed data.
        for line in stk_file_text:
            # Pandas was having trouble reading this in so just parse by column width in converted .txt doc.
            id1 = line[0:20].rstrip()
            id2 = line[20:35].rstrip()
            id3 = line[35:46].rstrip()
            country = line[46:56].rstrip()
            sat_type = line[56:66].rstrip()
            id4 = line[66:116].rstrip()
            num1 = line[116:121].rstrip()
            num2 = line[121:127].rstrip()
            num3 = line[127:132].rstrip()
            num4 = line[132:141].rstrip()
            sat_status = line[141:149].rstrip()
            fetched_date = line[149:].rstrip()
            data_agg = [
                id1,
                id2,
                id3,
                country,
                sat_type,
                id4,
                num1,
                num2,
                num3,
                num4,
                sat_status,
                fetched_date
            ]
            stk_data_parsed.append(data_agg)
    
    stk_data = pandas.DataFrame(stk_data_parsed, columns=stk_headers)
    sat_data = pandas.read_csv("data/SATCAT.csv")
    # Merge datasets and filter out decayed satellites
    merged_data = sat_data.merge(stk_data, how='inner', left_on='INTLDES', right_on='id3')
    merged_data = merged_data[merged_data['DECAY'].isnull()]
    merged_data.to_csv('data/merged.csv')
    return merged_data

def plot_satellites_overlapping(merged_data, x_trade_space_altitudes_km=None):
    x_trade_space_altitudes_km = x_trade_space_altitudes_km or [i for i in range(0, 2000, 5)]
    y_satelites_overlapping = []
    start_x = x_trade_space_altitudes_km[0]
    end_x = x_trade_space_altitudes_km[-1]
    for altitude in x_trade_space_altitudes_km:
        # Get all sats within the perigee/apogee of the altitude provided.
        sats_within_range = merged_data[(merged_data['PERIGEE'] <= altitude) 
                                & (merged_data['APOGEE'] >= altitude)]
        y_satelites_overlapping.append(len(sats_within_range))
    
    fig = px.bar(x=x_trade_space_altitudes_km, y=y_satelites_overlapping, 
                 title=f'Sats overlapping (y) for altitudes (x) within range of [{start_x}, {end_x}] km',
                 labels={'x': 'Altitude (km)', 'y':'Number of Overlapping Satellites'})
    fig.show()
In [2]:
import requests
import pandas
import json
import os

SPACETRACK_LOGIN_URL = "https://www.space-track.org/ajaxauth/login"
SPACETRACK_TLE_QUERY_URL = "https://www.space-track.org/basicspacedata/query/class/tle_publish/PUBLISH_EPOCH/%3Enow-30/orderby/PUBLISH_EPOCH%20asc/emptyresult/show"

# By default just use the files we have.  Otherwise fetch the latest TLE's.
def load_tles(username, password, override=False):
    if not override and os.path.exists("data/tles.json"):
        with open ("data/tles.json", "r") as tle_json:
            return json.load(tle_json) 
    with requests.Session() as session:
        resp = session.post(SPACETRACK_LOGIN_URL, data={'identity':username,'password':password})
        print(resp.__dict__)
        # Query for recent TLE's.
        result = session.get(SPACETRACK_TLE_QUERY_URL, auth=(username, password))
        tle_data = result.json()
        with open("data/tles.json", "w+") as tle_json:
            json.dump(tle_data, tle_json, indent=4)
    if not tle_data:
        raise Error("Bad request")
    return tle_data


def merge_tles_to_dataframe(data, username, password):
    tle_json = load_tles(username, password)
    tle_dataframe = pandas.DataFrame(tle_json)
    
    # Add TLE's to datasets.
    merged_data = data[data['DECAY'].isnull()]
    print(f"{len(merged_data)} data elements before TLE merge")
    
    # Only grab latest TLE's from the publish epoch.  Also need NORAD_CAT_ID's as integers to map to other .csv.
    tle_dataframe['NORAD_CAT_ID'] = tle_dataframe['NORAD_CAT_ID'].astype('int64')
    tle_dataframe = tle_dataframe.sort_values(by=['PUBLISH_EPOCH'],ascending=[False]).groupby('NORAD_CAT_ID', as_index=False).first()
    
    # Our complete merged dataframe!
    merged_data = merged_data.merge(tle_dataframe, how='inner', left_on='NORAD_CAT_ID', right_on='NORAD_CAT_ID')
    print(f"{len(merged_data)} data elements after TLE merge")
    merged_data.to_csv("data/merged-with-tles.csv")

    return merged_data
In [3]:
import os
import datetime
import random

import orekit
from orekit.pyhelpers import datetime_to_absolutedate, absolutedate_to_datetime
from org.hipparchus.geometry.euclidean.threed import Vector3D
from org.hipparchus.linear import RealMatrix
from org.orekit.propagation import StateCovariance
from org.orekit.frames import FramesFactory
from org.orekit.orbits import PositionAngleType, CartesianOrbit, KeplerianOrbit, CircularOrbit, EquinoctialOrbit, OrbitType
from org.orekit.ssa.metrics import ProbabilityOfCollision 
from org.orekit.ssa.collision.shorttermencounter.probability.twod import Patera2005
from org.orekit.utils import PVCoordinates
from org.orekit.utils import Constants
from org.hipparchus.linear import MatrixUtils
from org.orekit.utils import IERSConventions, PVCoordinates
from org.hipparchus.linear import MatrixUtils
from orekit.pyhelpers import setup_orekit_curdir
from org.orekit.propagation.analytical.tle import TLE, TLEPropagator
from org.orekit.time import TimeScalesFactory, AbsoluteDate
from datetime import datetime, timedelta
from typing import Any

import subprocess
import multiprocessing
import time

from dataclasses import dataclass

from orethread import handle_task_queue

orekit.pyhelpers.download_orekit_data_curdir()

print("Starting OreKit VM")
vm = orekit.initVM()
setup_orekit_curdir()
print("Done setting up OreKit VM")

def load_from_dataframe(dataframe):
    tle_list = []
    for index, row in dataframe.iterrows():
        tle1 = row['TLE_LINE1']
        tle2 = row['TLE_LINE2']
        tle_list.append((tle1, tle2))
    return tle_list

def dump_queue(q):
    q.put(None)
    return list(iter(q.get, None))

def determine_collisions(merged_data, tle1, tle2, time_step_seconds=60.0, collision_threshold_km=5, days_forward=1):
    # Load TLEs
    sat_tle = (tle1, tle2)  # Replace with our sat actual TLE
    potential_debris_tles = load_from_dataframe(merged_data)  # File containing debris TLEs
    
    # # Initialize propagators
    # sat_propagator = TLEPropagator.selectExtrapolator(sat_tle)
    # debris_propagators = [TLEPropagator.selectExtrapolator(tle) for tle in potential_debris_tles]
    
    ts = TimeScalesFactory.getUTC()
    dt = datetime.now()
    dt2 = dt + timedelta(days=days_forward)
    start_date = AbsoluteDate(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second + dt.microsecond / 1000000.0, ts)
    end_date = AbsoluteDate(dt2.year, dt2.month, dt2.day, dt2.hour, dt2.minute, dt2.second + dt2.microsecond / 1000000.0, ts)

    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()

    worker_count = 16
    workers = []

    print("Starting collision computations.  This may take some time.")
    current_date = start_date
    jobs = 0
    while current_date.compareTo(end_date) <= 0:
        task_queue.put(absolutedate_to_datetime(current_date))
        current_date = current_date.shiftedBy(time_step_seconds)
        jobs += 1

    print(f"Executing {jobs} jobs")
    
    for _ in range(worker_count):
        task_queue.put(None)

    for _ in range(worker_count):
        p = multiprocessing.Process(target=handle_task_queue, args=(task_queue, result_queue, sat_tle, potential_debris_tles, collision_threshold_km))
        workers.append(p)
        p.start()
        
    for p in workers:
        p.join()

    collision_count = len(dump_queue(result_queue))
    print(f"{collision_count} total collisions possible during specified time frame")
Downloading file from: https://gitlab.orekit.org/orekit/orekit-data/-/archive/master/orekit-data-master.zip
Starting OreKit VM
Done setting up OreKit VM
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.

Load Data¶

Run this cell once and then use it for subsequent cells.

In [4]:
spacetrack_username = "" # TODO - Replace
spacetrack_password = "" # TODO - Replace

provided_data = read_data()
merged_tle_data = merge_tles_to_dataframe(provided_data, spacetrack_username, spacetrack_password)
31519 data elements before TLE merge
26985 data elements after TLE merge

Determine number of overlapping sats for given altitudes¶

In [5]:
# Altitudes between n1 and n2 km, incrementing n3 km per iteration.
plot_satellites_overlapping(provided_data, [i for i in range(0, 1000+1, 2)])
plot_satellites_overlapping(provided_data, [i for i in range(0, 10000+1, 20)])
plot_satellites_overlapping(provided_data, [i for i in range(0, 50000+1, 100)])

Determine Collisions¶

After the trade analysis has been done and an orbit selected you just need to update this cell with your TLE's and call the propogator.

In [6]:
import math
from org.orekit.propagation.analytical.tle.generation import FixedPointTleGenerationAlgorithm, LeastSquaresTleGenerationAlgorithm
from org.orekit.propagation import SpacecraftState

def create_tle_from_orbital_elements(a, e, i, omega, raan, lv):
    # These are just for the template format.  DO NOT CHANGE THESE.
    sat_tle1 = "1 25544U 98067A   25055.73635937  .00029091  00000-0  52662-3 0  9995"
    sat_tle2 = "2 25544  51.6356 143.9899 0006112 310.6589  49.3868 15.49395806497720"

    ts = TimeScalesFactory.getUTC()
    dt = datetime.now()
    date = AbsoluteDate(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second + dt.microsecond / 1000000.0, ts)

    inertial_frame = FramesFactory.getEME2000()
    orbit = KeplerianOrbit(a, e, i, omega, raan, lv, PositionAngleType.TRUE, inertial_frame, date, Constants.IAU_2015_NOMINAL_EARTH_GM)
    #orbit = EquinoctialOrbit(orbit)

    template_tle = TLE(sat_tle1, sat_tle2)
    tle_generator = LeastSquaresTleGenerationAlgorithm(1_000_000)

    generated_tle = TLE.stateToTLE(SpacecraftState(orbit), template_tle, tle_generator);
    
    return generated_tle.getLine1(), generated_tle.getLine2()


# TODO - Replace me with your orbit information
a = 500_000.0  # Semi-major axis in meters
e = 0.000  # Eccentricity
i = math.radians(97.0)  # Inclination in radians
omega = math.radians(90.0)  # Argument of perigee in radians
raan = math.radians(20.0)  # Right ascension of the ascending node in radians
lv = math.radians(0.0)  # True anomaly in radians

# Update with our TLE's.
#sat_tle1, sat_tle2 = create_tle_from_orbital_elements(a, e, i, omega, raan, lv)
#print(sat_tle1)
#print(sat_tle2)

# TODO - replace with your TLE's.
sat_tle1 = "1 25544U 98067A   25055.73635937  .00029091  00000-0  52662-3 0  9995"
sat_tle2 = "2 25544  51.6356 143.9899 0006112 310.6589  49.3868 15.49395806497720"

# Check for any possible collisions within 1 day, with steps of 60 second, with 'collisions' being satellites within 10 km.
# You can modify these parameters as you see fit.  Ideally you would want sub-second time steps over a 2-3 week period, but
# my feeble little macbook won't run that very fast - even though I made it to use multiple CPU cores.
# Normally you would have a larger machine and propogate over a period of weeks, while constantly refreshing orbital data on subsequent runs.
# TLE's are also generally inaccurate and can drift fairly quickly - so while this method is not perfect, it should give a small degree
# of confidence in the trade space.
determine_collisions(merged_tle_data, sat_tle1, sat_tle2, time_step_seconds=60.0, collision_threshold_km=10, days_forward=1)
Starting collision computations.  This may take some time.
Executing 1441 jobs
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to protect stack guard pages failed.
OpenJDK 64-Bit Server VM warning: Attempt to deallocate stack guard pages failed.
Thread: 83de7f0c-b582-4b92-8c4d-3981fca78322 has processed 100 jobs.
Thread: eaffd000-36ca-49e4-aac8-57abf784cd31 has processed 100 jobs.
Thread: aea6cec5-5315-4fe0-ba1e-76dceb15b5bd has processed 100 jobs.
0 total collisions possible during specified time frame

Categorize Debris¶

This is just a cell to categorize the debris in the selected orbit altitude and then plot the types of debris.

In [7]:
def categorize_debris_at_altitude(altitude_km, merged_data):
    # Get sats within range.    
    sats_within_range = merged_data[(merged_data['PERIGEE'] <= altitude_km) 
                                    & (merged_data['APOGEE'] >= altitude_km)]
    # Fitler out decayed satellites.
    sats_within_range = sats_within_range[sats_within_range['DECAY'].isnull()]
    
    # rocket body, debris, inactive satellites, and active satellites.
    rocket_body_count = len(sats_within_range[sats_within_range['OBJECT_TYPE'] == "ROCKET BODY"]) # OBJECT_TYPE = ROCKET BODY
    debris_count = len(sats_within_range[sats_within_range['OBJECT_TYPE'] == "DEBRIS"]) # OBJECT_TYPE = DEBRIS
    active_satellites = len(sats_within_range[(sats_within_range['OBJECT_TYPE'] == "PAYLOAD") &
                                            (sats_within_range['sat_status'] == "Active")]) # OBJECT_TYPE = PAYLOAD, sat_status = Inactive
    inactive_satellites = len(sats_within_range[(sats_within_range['OBJECT_TYPE'] == "PAYLOAD") &
                                            (sats_within_range['sat_status'] == "Inactive")]) # OBJECT_TYPE = PAYLOAD, sat_status = Inactive
    unknown_satellites = len(sats_within_range[(sats_within_range['OBJECT_TYPE'] == "PAYLOAD") &
                                            (sats_within_range['sat_status'] == "Unknown")]) # OBJECT_TYPE = PAYLOAD, sat_status = Unknown
    keys = ["Rocket Body", "Debris", "Active Sat", "Inactive Sat", "Unknown Sat"]
    values = [rocket_body_count, debris_count, active_satellites, inactive_satellites, unknown_satellites]
    print([f"{keys[i]}:{values[i]}" for i in range(0, len(values))])
    fig = px.bar(x=keys,
                 y=values, 
                 title=f'Sats overlapping (y) of altitude {altitude_km}km categorized',
                 labels={'x': 'Category', 'y':'Number of Overlapping Satellites'})
    fig.show()

categorize_debris_at_altitude(altitude_km=275, merged_data=provided_data)
['Rocket Body:192', 'Debris:89', 'Active Sat:15', 'Inactive Sat:14', 'Unknown Sat:15']
In [ ]: